import json
import os
import shutil
import unittest

import paddle
from paddle.distributed import fleet

from fastdeploy.config import (
    CacheConfig,
    FDConfig,
    GraphOptimizationConfig,
    LoadConfig,
    ModelConfig,
    ParallelConfig,
    RoutingReplayConfig,
)
from fastdeploy.model_executor.layers.moe.moe import FusedMoE
from fastdeploy.model_executor.layers.quantization.w4afp8 import W4AFP8Config
from fastdeploy.scheduler import SchedulerConfig

# from fastdeploy.worker.worker_process import init_distributed_environment
from tests.utils import OpPerformanceTester

paddle.set_default_dtype("bfloat16")


class FuseMoEWrapper(paddle.nn.Layer):
    def __init__(
        self,
        model_config: ModelConfig,
        tp_size: int = 1,
        tp_rank: int = 0,
        ep_size: int = 1,
        ep_rank: int = 0,
        prefix: str = "ernie.layers.0",
        nnodes: int = 1,
    ):
        super().__init__()
        self.model_config = model_config

        self.tp_size = tp_size
        self.ep_size = ep_size
        self.ep_rank = ep_rank

        self.prefix = prefix
        self.fd_config = FDConfig(
            model_config=self.model_config,
            parallel_config=ParallelConfig(
                {
                    "tensor_parallel_size": self.tp_size,
                    "expert_parallel_size": self.ep_size,
                    "expert_parallel_rank": self.ep_rank,
                    "data_parallel_size": self.ep_size,
                }
            ),
            quant_config=W4AFP8Config(
                weight_scale_dict=None,
                act_scale_dict=None,
                is_permuted=False,
                hadamard_block_size=512,
                is_quantized=False,
            ),
            scheduler_config=SchedulerConfig({}),
            cache_config=CacheConfig({}),
            graph_opt_config=GraphOptimizationConfig({}),
            load_config=LoadConfig({}),
            ips=",".join(["0"] * nnodes),
            routing_replay_config=RoutingReplayConfig({}),
        )
        self.fd_config.parallel_config.tp_group = None
        self.fd_config.parallel_config.tensor_parallel_rank = tp_rank
        self.fd_config.parallel_config.expert_parallel_size = self.ep_size
        if self.ep_size > 1:
            self.fd_config.parallel_config.ep_group = fleet.get_hybrid_communicate_group().get_model_parallel_group()
            self.fd_config.scheduler_config.splitwise_role = "mixed"
            self.fd_config.model_config.moe_phase.phase = "decode"

        weight_key_map = {
            "gate_weight_key": f"{self.prefix}.gate.weight",
            "gate_correction_bias_key": f"{self.prefix}.moe_statics.e_score_correction_bias",
            "up_gate_proj_expert_weight_key": f"{self.prefix}.mlp.experts.{{}}.up_gate_proj.weight",
            "down_proj_expert_weight_key": f"{self.prefix}.mlp.experts.{{}}.down_proj.weight",
        }

        self.fused_moe = FusedMoE(
            fd_config=self.fd_config,
            moe_intermediate_size=self.fd_config.model_config.moe_intermediate_size,
            num_experts=self.fd_config.model_config.moe_num_experts,
            top_k=self.fd_config.model_config.moe_k,
            # avoiding invoke clean_low_latency_buffer in mixed ep.
            layer_idx=0,
            weight_key_map=weight_key_map,
            topk_method="noaux_tc",
            topk_group=4,
            n_group=8,
            gate_correction_bias=paddle.zeros([self.fd_config.model_config.moe_num_experts], paddle.float32),
            # gate_correction_bias = gate_correction_bias_real_data
        )
        self.pack_num = 1
        moe_layer = self.fused_moe

        up_gate_proj_weight_shape = [
            moe_layer.num_local_experts,
            moe_layer.hidden_size // self.pack_num,
            moe_layer.moe_intermediate_size * 2,
        ]
        down_proj_weight_shape = [
            moe_layer.num_local_experts,
            moe_layer.moe_intermediate_size // self.pack_num,
            moe_layer.hidden_size,
        ]

        up_gate_proj_weight = paddle.randn(up_gate_proj_weight_shape, paddle.bfloat16)
        down_proj_weight = paddle.randn(down_proj_weight_shape, paddle.bfloat16)

        local_expert_ids = list(
            range(moe_layer.expert_id_offset, moe_layer.expert_id_offset + moe_layer.num_local_experts)
        )
        state_dict = {}
        up_gate_proj_expert_weight_key = moe_layer.weight_key_map.get("up_gate_proj_expert_weight_key")
        down_proj_expert_weight_key = moe_layer.weight_key_map.get("down_proj_expert_weight_key")

        for expert_idx in local_expert_ids:
            up_gate_proj_expert_weight_key_name = up_gate_proj_expert_weight_key.format(expert_idx)
            down_proj_expert_weight_key_name = down_proj_expert_weight_key.format(expert_idx)

            state_dict[up_gate_proj_expert_weight_key_name] = up_gate_proj_weight[
                expert_idx - moe_layer.expert_id_offset
            ]
            state_dict[down_proj_expert_weight_key_name] = down_proj_weight[expert_idx - moe_layer.expert_id_offset]

        moe_layer.load_state_dict(state_dict)


class TestW4A8FusedMoE(unittest.TestCase):
    def setUp(self) -> None:
        self.architectures = ["Ernie4_5_MoeForCausalLM"]
        self.hidden_size = 256
        self.moe_intermediate_size = 256
        self.moe_num_experts = 2
        self.moe_k = 2
        self.hidden_act = "silu"
        self.num_attention_heads = 56
        self.num_hidden_layers = 1
        self.model_config = self.build_model_config()

    def build_model_config(self) -> ModelConfig:
        model_name_or_path = self.build_config_json()
        return ModelConfig(
            {
                "model": model_name_or_path,
                "max_model_len": 2048,
            }
        )

    def build_config_json(self) -> str:
        config_dict = {
            "architectures": self.architectures,
            "hidden_size": self.hidden_size,
            "moe_intermediate_size": self.moe_intermediate_size,
            "moe_num_experts": self.moe_num_experts,
            "moe_k": self.moe_k,
            "hidden_act": self.hidden_act,
            "num_attention_heads": self.num_attention_heads,
            "num_hidden_layers": self.num_hidden_layers,
            "dtype": "bfloat16",
            "is_quantized": False,
        }

        tmp_dir = "./tmp_w4afp8_moe"
        os.makedirs(tmp_dir, exist_ok=True)
        with open(f"./{tmp_dir}/config.json", "w") as f:
            json.dump(config_dict, f)
        self.model_name_or_path = os.path.join(os.getcwd(), tmp_dir)
        return self.model_name_or_path

    def test_fused_moe(self):
        # init_distributed_environment()

        gating = paddle.nn.Linear(self.model_config.hidden_size, self.model_config.moe_num_experts)
        gating.to(dtype=paddle.float32)  # it's dtype is bfloat16 default, but the forward input is float32
        gating.weight.set_value(paddle.rand(gating.weight.shape, dtype=paddle.float32))

        # ep_size = paddle.distributed.get_world_size()
        # ep_rank = paddle.distributed.get_rank()
        ep_size = 1
        ep_rank = 0

        tp_size = 1
        tp_rank = 0

        nnodes = (ep_size + 7) // 8

        # 这行代码必须保留，否则影响均匀性！
        paddle.seed(ep_rank + 100)

        fused_moe = FuseMoEWrapper(self.model_config, tp_size, tp_rank, ep_size, ep_rank, nnodes=nnodes).fused_moe
        weight_size = fused_moe.top_k * fused_moe.hidden_size * fused_moe.moe_intermediate_size * 3 / 2

        tester = OpPerformanceTester(
            op_name="w4afp8-moe",
            op_fn=fused_moe,
            num_layers=self.model_config.num_hidden_layers,
            weight_size=weight_size,
            gate=gating,
        )

        tester.benchmark(
            input_size=self.model_config.hidden_size,
            batch_sizes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        )

    def tearDown(self) -> None:
        if self.model_name_or_path:
            print("Remove tmp model config file")
            shutil.rmtree(self.model_name_or_path)


if __name__ == "__main__":
    unittest.main()
